#encoding=utf-8
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.classification import LogisticRegressionWithSGD, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import jieba
from pyspark import SparkConf, SparkContext
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
conf = SparkConf().setMaster("local").setAppName("My App") #创建Sparkconf对象配置应用
sc = SparkContext(conf = conf) #基于SparkConf创建SparkContext对象

# 正面评价为1，负面评价为0
def ReduceRate(rate):
    if rate >= 4:
        return 1
    elif rate <= 2:
        return 0
    pass

originData = sc.textFile('./star-comment.txt')
originDistinctData = originData.distinct()
rateDocument = originDistinctData.map(lambda line: line.split('\t')).filter(lambda line: len(line) == 2)
# 统计总的评分分布，(5,4)认为正面评价,(2,1)认为负面评价
fiveRateDocument = rateDocument.filter(lambda line: line[0] == '5')
fourRateDocument = rateDocument.filter(lambda line: line[0] == '4')
threeRateDocument = rateDocument.filter(lambda line: line[0] == '3')
twoRateDocument = rateDocument.filter(lambda line: line[0] == '2')
oneRateDocument = rateDocument.filter(lambda line: line[0] == '1')
print fiveRateDocument.count(), fourRateDocument.count(), threeRateDocument.count(), \
      twoRateDocument.count(), oneRateDocument.count()

# 合并正负样本数据，重新分区以提高效率
negRateDocument = oneRateDocument.union(twoRateDocument)
negRateDocument.repartition(1)
temp_posRateDocument = fiveRateDocument.union(fourRateDocument)
# 生成数据集，取正面评价数目和负面评价数目相等
posRateDocument = sc.parallelize(temp_posRateDocument.take(negRateDocument.count())).repartition(1)
allRateDocument = negRateDocument.union(posRateDocument)
allRateDocument.repartition(1)
rate = allRateDocument.map(lambda s: ReduceRate(int(s[0])))
document = allRateDocument.map(lambda s: s[1])
# -----------
# 将RDD中的文本利用结巴分词全模式进行分词
words = document.map(lambda w: "/".join(jieba.cut(w, cut_all=True))).map(lambda line: line.split("/"))
# 使用特征哈希处理文本，进行向量表示
hashingTF = HashingTF()
tf = hashingTF.transform(words)
tf.cache()
# 采用tf-idf进行特征提取
idfModel = IDF().fit(tf)
tfidf = idfModel.transform(tf)
# 将特征向量和分类标签链接成字典，输入文本特征向量和分类标签(0,1)
# 将数据的60%作为训练集，另外作为测试集
zipped = rate.zip(tfidf)
data = zipped.map(lambda line: LabeledPoint(line[0], line[1]))
training, test = data.randomSplit([0.6, 0.4], seed=0)
# 训练朴素贝叶斯分类模型，得到 NBmodel 模型来预测测试集的文本特征向量
NBmodel = NaiveBayes.train(training, 1.0)
predictionAndLabel = test.map(lambda p: (NBmodel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()
print "\naccuracy:", accuracy

yourDocument = input("输入待分类的评论：")
yourwords = "/".join(jieba.cut_for_search(yourDocument)).split("/")
yourtf = hashingTF.transform(yourwords)
yourtfidf = idfModel.transform(yourtf)
print 'NaiveBayes Model Predict:', NBmodel.predict(yourtfidf)

yourDocument = input("输入待分类的评论：")
yourwords = "/".join(jieba.cut_for_search(yourDocument)).split("/")
yourtf = hashingTF.transform(yourwords)
yourtfidf = idfModel.transform(yourtf)
print 'NaiveBayes Model Predict:', NBmodel.predict(yourtfidf)